package com.example.springbootdockertest.service.impl;

import cn.hutool.core.date.TimeInterval;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.example.springbootdockertest.entity.DO.OrderInfoDo;
import com.example.springbootdockertest.mapper.OrderInfoMapper;
import com.example.springbootdockertest.service.OrderInfoService;
import com.google.common.collect.Lists;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import javax.annotation.Resource;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;

/**
 * <p>
 * 服务实现类
 * </p>
 *
 * @author crush
 * @since 2022-04-03
 */
@Service
@Slf4j
public class OrderInfoServiceImpl extends ServiceImpl<OrderInfoMapper, OrderInfoDo> implements OrderInfoService {

    @Override
    public OrderInfoDo getOrder() {
        OrderInfoDo order = baseMapper.getOrder();
        return order;
    }

    // @Override
    // public String threadOrder(List<OrderInfoDo> employeeDOList) {
    //
    //     return null;
    // }

    /**
     * 测试多线程事务.
     *
     * @param employeeDOList
     */
    @Override
    @Transactional
    public String threadOrder(List<OrderInfoDo> employeeDOList) {
        try {
            //先做删除操作,如果子线程出现异常,此操作不会回滚
            this.getBaseMapper().delete(null);
            //获取线程池
            ExecutorService service = getThreadPool();
            //拆分数据,拆分5份
            List<List<OrderInfoDo>> lists = Lists.partition(employeeDOList, 5);
            //执行的线程
            Thread[] threadArray = new Thread[lists.size()];
            //监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭
            CountDownLatch countDownLatch = new CountDownLatch(lists.size());
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            for (int i = 0; i < lists.size(); i++) {
                if (i == lists.size() - 1) {
                    atomicBoolean.set(false);
                }
                List<OrderInfoDo> list = lists.get(i);
                threadArray[i] = new Thread(() -> {
                    try {
                        //最后一个线程抛出异常
                        if (!atomicBoolean.get()) {
                            throw new RuntimeException("出现异常");
                        }
                        //批量添加,mybatisPlus中自带的batch方法
                        this.saveBatch(list);
                    } finally {
                        countDownLatch.countDown();
                    }

                });
            }
            for (int i = 0; i < lists.size(); i++) {
                service.execute(threadArray[i]);
            }
            //当子线程执行完毕时,主线程再往下执行
            countDownLatch.await();
            System.out.println("添加完毕");
        } catch (Exception e) {
            log.info("error", e);
            throw new RuntimeException("出现异常");
        }
        return null;
    }

    @Override
    @Transactional
    public String threadOrderTrue(List<OrderInfoDo> employeeDOList) throws SQLException {
        // 获取数据库连接,获取会话(内部自有事务)
        // SqlSession sqlSession = getSqlSession();
        // Connection connection = sqlSession.getConnection();
        try {
            // // 设置手动提交
            // connection.setAutoCommit(false);
            // //获取mapper
            // OrderInfoMapper infoMapper = sqlSession.getMapper(OrderInfoMapper.class);
            // //先做删除操作
            // infoMapper.delete(null);
            // this.remove(null);

            //获取执行器
            ExecutorService service = getWorkerThreadPool();
            List<Callable<Integer>> callableList = new ArrayList<>();
            //拆分list
            List<List<OrderInfoDo>> lists = Lists.partition(employeeDOList, 3000);
            AtomicBoolean atomicBoolean = new AtomicBoolean(true);
            CountDownLatch countDownLatch = new CountDownLatch(lists.size());
            // for (int i = 0; i < lists.size(); i++) {
            //     // if (i == lists.size() - 1) {
            //     //     atomicBoolean.set(false);
            //     // }
            //     List<OrderInfoDo> list = lists.get(i);
            //     //使用返回结果的callable去执行,
            //     Callable<Integer> callable = () -> {
            //         try {
            //             //让最后一个线程抛出异常
            //             // if (!atomicBoolean.get()) {
            //             //     throw new RuntimeException(Thread.currentThread().getName() + "出现异常");
            //             // }
            //             boolean b = this.saveBatch(list);
            //             return 1;
            //         } finally {
            //             countDownLatch.countDown();
            //         }
            //
            //     };
            //     callableList.add(callable);
            // }
            TimeInterval timeInterval = new TimeInterval();
            timeInterval.start();
            // this.saveBatch(employeeDOList);//单个执行
            ExecutorService executorService = newThreadPool();
            for (List<OrderInfoDo> list : lists) {
                executorService.submit(() -> {
                    this.saveBatch(list, list.size());
                    countDownLatch.countDown();
                });
                // Callable<Integer> callable = () -> {
                //     try {
                //         boolean b = this.saveBatch(list);
                //         return 1;
                //     } finally {
                //         countDownLatch.countDown();
                //     }
                //
                // };
                // callableList.add(callable);
            }
            //执行子线程
            // List<Future<Integer>> futures = executorService.invokeAll(callableList);
            // for (Future<Integer> future : futures) {
            //     //如果有一个执行不成功,则全部回滚
            //     if (future.get() <= 0) {
            //         throw new RuntimeException(Thread.currentThread().getName() + "出现异常");
            //     }
            // }
            countDownLatch.await();
            System.out.println("耗时" + timeInterval.intervalMs());
            // connection.commit();
            System.out.println("添加完毕");
        } catch (Exception e) {
            // connection.rollback();
            log.info("error", e);
            throw new RuntimeException("catch出现异常");
        }
        // finally {
        //     connection.close();
        // }
        return null;

    }

    @Override
    public List<OrderInfoDo> getOrderList() {
        QueryWrapper<OrderInfoDo> queryWrapper = new QueryWrapper<>();
        // queryWrapper.between("id", 1000, 1002);
        queryWrapper.exists("id", Arrays.asList(1000, 1001));
        List<OrderInfoDo> list = this.list(queryWrapper);
        return list;
    }


    private static int maxPoolSize = Runtime.getRuntime().availableProcessors();
    private volatile static ExecutorService executorService;

    public static ExecutorService getThreadPool() {
        if (executorService == null) {
            executorService = newThreadPool();
        }
        return executorService;
    }

    //worker线程池配置
    private static final int workerThreadNum = 10;
    private static final String workerThreadName = "order-Pool-%d";
    private static ExecutorService workerThreadPool;

    private ExecutorService getWorkerThreadPool() {
        if (workerThreadPool != null) {
            return workerThreadPool;
        }
        BasicThreadFactory build = new BasicThreadFactory.Builder().namingPattern(workerThreadName).daemon(true).build();

        workerThreadPool = new ThreadPoolExecutor(workerThreadNum
                , workerThreadNum
                , 0L
                , TimeUnit.SECONDS
                , new LinkedBlockingQueue<Runnable>()
                , build
                , new ThreadPoolExecutor.AbortPolicy());

        return workerThreadPool;
    }

    private static ExecutorService newThreadPool() {
        int queueSize = 500;
        int corePool = Math.min(5, maxPoolSize);
        return new ThreadPoolExecutor(10, 20, 10000L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<>(queueSize), new ThreadPoolExecutor.AbortPolicy());
    }

    @Resource
    private SqlSessionTemplate sqlSessionTemplate;

    public SqlSession getSqlSession() {
        SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();
        return sqlSessionFactory.openSession();
    }
}
